package com.huan.flink.union;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 通过 union 来进行多条流的合并
 * 1、通过union来进行合流，流中元素的类型必须都要一致
 * 2、union一次可以合并多个流
 *
 * @author huan.fu
 * @date 2023/9/23 - 10:53
 */
public class UnionStreamApplication {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建3条数据流
        DataStreamSource<Integer> source1 = environment.fromElements(1, 2, 3, 4);
        DataStreamSource<Integer> source2 = environment.fromElements(11, 22, 33, 44, 55, 66);
        // 本地通过 nc -lk 8888 启动一个socket
        DataStreamSource<String> source3 = environment.socketTextStream("localhost", 8888);

        // 将3条数据流进行合并
        source1.union(source2)
                // union 合流， 流中元素的数据类型必须都要一样
                .union(source3.map(Integer::valueOf))
                .print();

        environment.execute("合流");
    }
}
